import pathlib
import time
import pandas as pd
import logging
import rqdatac
import json
from easydict import EasyDict


def refresh_conbond(start_date: str, end_date: str, freq: str,
                    cache_dir: pathlib.Path):
    auth_file = pathlib.Path('~/.cache/quant/.auth.json').expanduser()
    cred = EasyDict(json.loads(auth_file.open(mode='r').read()))
    rqdatac.init(cred.ricequant.username, cred.ricequant.password)
    f_all_instruments = cache_dir.joinpath('all_instruments.csv')
    if f_all_instruments.exists():
        df_all_instruments = pd.read_csv(f_all_instruments,
                                         index_col=['order_book_id'])
    else:
        logging.info('Read all_instruments from rqdata')
        df_all_instruments = rqdatac.convertible.all_instruments()
        df_all_instruments['trading_hours'] = '09:31-11:30,13:01-15:00'
        df_all_instruments['board_type'] = 'MainBoard'
        df_all_instruments['type'] = 'CS'
        df_all_instruments['market_tplus'] = 0
        df_all_instruments['round_lot'] = 10
        df_all_instruments['account_type'] = 'STOCK'
        df_all_instruments.set_index('order_book_id', inplace=True)

        logging.info('Read call_info from rqdata')
        # data size is small, always get full data
        df_call_info = rqdatac.convertible.get_call_info(
            df_all_instruments.index.tolist(),
            start_date='2017-01-01',
            end_date=end_date).reset_index().set_index('order_book_id')
        df_call_info.to_csv(cache_dir.joinpath('call_info.csv'))
        df_all_instruments.fillna(
            {
                'listed_date': '2099-12-31'
            },
            inplace=True)
        df_all_instruments = df_all_instruments.join(df_call_info)
        df_all_instruments.to_csv(f_all_instruments)

    f_conversion_info = cache_dir.joinpath('conversion_info.csv')
    if not f_conversion_info.exists():
        logging.info('Read conversion_info from rqdata')
        df_conversion_info = rqdatac.convertible.get_conversion_info(
            df_all_instruments.index.tolist(),
            start_date=start_date,
            end_date=end_date)
        df_conversion_info.to_csv(f_conversion_info)

    f_suspended = cache_dir.joinpath('suspended.csv')
    if not f_suspended.exists():
        logging.info('Read suspended from rqdata')
        df_suspended = rqdatac.convertible.is_suspended(
            df_all_instruments.index.tolist(),
            start_date=start_date,
            end_date=end_date)
        df_suspended['datetime'] = pd.to_datetime(df_suspended.index)
        df_suspended.to_csv(f_suspended, index=False)

    f_indicators = cache_dir.joinpath('indicators.csv')
    if f_indicators.exists():
        df_indicators = pd.read_csv(f_indicators,
                                    parse_dates=['datetime'],
                                    index_col=['datetime', 'order_book_id'])
    else:
        logging.info('Read indicators from rqdata')
        df_indicators = rqdatac.convertible.get_indicators(
            df_all_instruments.index.tolist(),
            start_date=start_date,
            end_date=end_date)
        df_indicators = df_indicators.reset_index()
        df_indicators['datetime'] = pd.to_datetime(df_indicators.date)
        df_indicators.drop(columns=['date'], inplace=True)
        df_indicators.set_index(['datetime', 'order_book_id'], inplace=True)
        df_indicators.to_csv(f_indicators)

    f_conversion_price = cache_dir.joinpath('conversion_price.csv')
    if not f_conversion_price.exists():
        logging.info('Read conversion_price from rqdata')
        df_conversion_price = rqdatac.convertible.get_conversion_price(
            df_all_instruments.index.tolist(),
            # data size is small, always get full data
            start_date='2017-01-01',
            end_date=end_date)
        df_conversion_price.to_csv(f_conversion_price)

    f_put_info = cache_dir.joinpath('put_info.csv')
    if not f_put_info.exists():
        logging.info('Read put_info from rqdata')
        df_put_info = rqdatac.convertible.get_put_info(
            df_all_instruments.index.tolist(),
            # data size is small, always get full data
            start_date='2017-01-01',
            end_date=end_date)
        df_put_info.to_csv(f_put_info)
    assert freq == '1d'
    f_bond_price = cache_dir.joinpath('bond_price_%s.csv' % freq)
    if freq == '1d':
        if f_bond_price.exists():
            df_bond_price = pd.read_csv(f_bond_price,
                                        parse_dates=['datetime'],
                                        index_col=['order_book_id'])
        else:
            logging.info('Read bond_price from rqdata')
            df_bond_price = rqdatac.get_price(
                df_all_instruments.index.tolist(),
                start_date=start_date,
                end_date=end_date,
                frequency=freq).reset_index()
            df_bond_price['datetime'] = pd.to_datetime(df_bond_price.date)
            df_bond_price.drop(columns=['date'], inplace=True)
            df_bond_price.set_index('order_book_id', inplace=True)
            df_bond_price.to_csv(f_bond_price)
    else:
        ed = pd.Timestamp(end_date)
        dr = list(pd.date_range(pd.Timestamp(start_date), ed, freq='M'))
        if ed != dr[:-1]:
            dr.append(ed)
        start = start_date
        ndfs = []
        for end in list(dr):
            f = cache_dir.joinpath('bond_price_%s_%02d.csv' %
                                   (end.year, end.month))
            if f.exists():
                logging.info('Found %s' % f.absolute())
            else:
                logging.info('%s - %s' % (start, end))
                ndf = rqdatac.get_price(df_all_instruments.index.tolist(),
                                        start_date=start,
                                        end_date=end,
                                        frequency=freq)
                ndf.to_csv(f)
                ndfs.append(ndf)
                time.sleep(1)
            start = end + pd.Timedelta('1d')
        df_bond_price = pd.concat(ndfs)

    f_stock_price = cache_dir.joinpath('stock_price_%s.csv' % freq)
    if freq == '1d':
        if f_stock_price.exists():
            df_stock_price = pd.read_csv(f_stock_price,
                                         parse_dates=['datetime'])
        else:
            logging.info('Read stock_price from rqdata')
            df_stock_price = rqdatac.get_price(
                df_all_instruments.stock_code.tolist(),
                start_date=start_date,
                end_date=end_date,
                frequency=freq).reset_index()
            df_stock_price['datetime'] = pd.to_datetime(df_stock_price.date)
            df_stock_price.drop(columns=['date'], inplace=True)
            df_stock_price.to_csv(f_stock_price, index=False)
    else:
        ed = pd.Timestamp(end_date)
        dr = list(pd.date_range(pd.Timestamp(start_date), ed, freq='M'))
        if ed != dr[:-1]:
            dr.append(ed)
        start = start_date
        ndfs = []
        for end in list(dr):
            f = cache_dir.joinpath('stock_price_%s_%02d.csv' %
                                   (end.year, end.month))
            if f.exists():
                logging.info('Found %s' % f.absolute())
            else:
                logging.info('%s - %s' % (start, end))
                ndf = rqdatac.get_price(df_all_instruments.stock_code.tolist(),
                                        start_date=start,
                                        end_date=end,
                                        frequency=freq)
                ndf.to_csv(f)
                time.sleep(1)
                ndfs.append(ndf)
            start = end + pd.Timedelta('1d')
        df_stock_price = pd.concat(ndfs)

    f_instrument_industry = cache_dir.joinpath('instrument_industry.csv')
    if not f_instrument_industry.exists():
        logging.info('Read instrument_industry from rqdata')
        df_instrument_industry = rqdatac.convertible.get_instrument_industry(
            df_all_instruments.index.tolist(), level=0)

        df1 = df_all_instruments.join(df_instrument_industry)
        df2 = df1[pd.isna(df1.first_industry_name)]
        if not df2.empty:
            df3 = rqdatac.convertible.get_instrument_industry(
                df2.index.tolist(), level=0)
            if df3 is not None:
                assert df3[pd.isna(df3.first_industry_name)].empty
                df_instrument_industry = pd.concat(
                    [df_instrument_industry, df3])
        else:
            logging.info('No NA instrument_industry')
        df_instrument_industry.to_csv(f_instrument_industry)
    else:
        df_instrument_industry = pd.read_csv(f_instrument_industry)

    # volume from the data source is in terms of 1000
    df_bond_price['volume'] = df_bond_price.volume * 10

    # Add stock_code for joining
    logging.info("Adding stock_code")
    ndf = df_bond_price.join(df_all_instruments[['stock_code']])

    # Add stock_price column
    logging.info("Adding stock_price")
    df_stock_price.rename(columns={
        'order_book_id': 'stock_code',
        'open': 'stock_open',
        'close': 'stock_close',
    },
        inplace=True,
        errors='raise')

    ndf = ndf.reset_index().merge(
        df_stock_price[['stock_code', 'datetime', 'stock_open',
                        'stock_close']],
        on=['stock_code', 'datetime'],
        how='left')
    ndf.drop(columns=['stock_code'], inplace=True)
    ndf.sort_values(['datetime', 'order_book_id'], inplace=True)
    ndf.set_index(['datetime', 'order_book_id'], inplace=True)

    # Add columns from indicators
    logging.info("Adding indicators")
    # Use left join. e.g. 113034.XSHG is missing indicator for 2022-01-10 (last trading day)
    ndf = ndf.join(df_indicators[[
        'conversion_value', 'yield_to_maturity', 'remaining_size',
        'double_low_factor', 'conversion_premium', 'call_qualified_days',
        'put_qualified_days'
    ]],
        how='left')
    # 113010.XSHG, 2019-02-13, nan indicators
    ndf.loc[:,
            ['conversion_premium']] = ndf.loc[:,
                                              ['conversion_premium']].ffill()

    # Add suspended column
    #  logging.info("Adding suspended")
    #  f_suspended = cache_dir.joinpath('suspended.csv')
    #  suspended = pd.read_csv(f_suspended, index_col=['datetime'])
    #  cols, series = zip(*suspended.iteritems())
    #  suspended = pd.concat(series, keys=cols,
    #  names=['order_book_id'
    #  ]).to_frame().rename(columns={0: 'suspended'}, errors='raise')
    #  ndf = ndf.join(suspended)
    #  suspended: e.g. 128064.XSHE
    #  logging.info("Fillna of suspended")
    #  ndf.fillna({'suspended': False}, inplace=True)

    logging.info("Rename")
    ndf.rename(columns={
        'conversion_premium': 'convert_premium_rate',
    },
        inplace=True,
        errors='raise')

    logging.info("Write to csv")
    ndf.to_csv(cache_dir.joinpath('conbonds_%s.csv' % freq))

    df_instrument_industry.rename(columns={'first_industry_name': 'industry'},
                                  inplace=True,
                                  errors='raise')

    return df_all_instruments.join(df_instrument_industry[['industry']]), ndf
